package com.alinesno.cloud.compoment.kafka.listener;

import java.util.Optional;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import com.alinesno.cloud.compoment.kafka.KafkaComponentService;
import com.alinesno.cloud.compoment.kafka.enable.EnableComponentKafka;

/**
 * 应用启动
 * 
 * @author LuoAnDong
 * @since 2019年4月9日 下午3:27:45
 */
@SpringBootApplication
@EnableComponentKafka
public class KafkaApplicationListener implements CommandLineRunner {

	public static Logger log = LoggerFactory.getLogger(KafkaApplicationListener.class);

	@Autowired
	private KafkaComponentService kafkaComponentService;
	
	public static void main(String[] args) {
		SpringApplication.run(KafkaApplicationListener.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		log.info("running , kafkaComponentService:{}", kafkaComponentService);
		
		kafkaComponentService.blockSendMessage("test", "this is a test of kafka" + System.currentTimeMillis());
		kafkaComponentService.blockSendMessage("test-01", "this is a test of kafka" + System.currentTimeMillis());
	}
	
	@Component
	class AskListener {

		@KafkaListener(topicPattern = "test-01", containerFactory = "kafkaManualAckListenerContainerFactory")
		public void listen(ConsumerRecord<?, ?> record, @Payload String data ,
				@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
				@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
				@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
				Acknowledgment ack) {
			
			log.info("data:{} , key:{} , partition:{} , topic:{} , ts:{}" ,data , key , partition , topic , ts );
			
			Optional<?> kafkaMessage = Optional.ofNullable(record.value());
			if (kafkaMessage.isPresent()) {
				Object message = kafkaMessage.get();
				log.info("message =topic：" + topic + ", " + message);
			}

			ack.acknowledge();
		}
	}
	
	@Component
	class AutoListener {

		@KafkaListener(topicPattern = "test")
		public void listen(ConsumerRecord<?, ?> record, @Payload String data ,
				@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
				@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
				@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
				) {
			
			log.info("data:{} , key:{} , partition:{} , topic:{} , ts:{}" ,data , key , partition , topic , ts );
			
			
			Optional<?> kafkaMessage = Optional.ofNullable(record.value());
			if (kafkaMessage.isPresent()) {
				Object message = kafkaMessage.get();
				log.info("message =topic：" + topic + ", " + message);
			}
		}
	}

}